{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 多进程最佳实践\n",
    "torch.multiprocessing 是Python multiprocessing 的替代品。它支持完全相同的操作，但扩展了它以便通过 multiprocessing.Queue 发送的所有张量将其数据移动到共享内存中，并且只会向其他进程发送一个句柄。\n",
    "<div class=\"alert alert-info\"><li>注意</li></div>\n",
    "<pre>当一个张量Tensor发送到另一个进程时， 张量Tensor数据将被共享。如果torch.Tensor.grad不是None，它也是共享的。在Tensor没有torch.Tensor.grad字段发送到另一个进程之后，它会创建一个标准的特定。grad Tensor于流程的流程，不会在所有流程中自动共享，这与 Tensor数据的共享方式不同。 </pre>\n",
    "这允许实现各种训练方法，如Hogwild，A3C或任何其他需要异步操作的方法。"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 共享CUDA张量\n",
    "\n",
    "仅在Python 3中使用 spawn 或 forkserver 启动方法才支持在进程之间共享CUDA张量。Python 2中的 multiprocessing 只能使用 fork 创建子进程，并且不被CUDA运行时所支持。 \n",
    "\n",
    "<div class=\"alert alert-warning\"><li>需要注意的地方</li></div>\n",
    "CUDA API要求导出到其他进程的分配，只要它们被使用就要一直保持有效。您应该小心，确保您共享的CUDA张量只要有必要就不要超出范围。这不是共享模型参数的问题， 但传递其他类型的数据应该小心。注意，此限制不适用于共享CPU内存。\n",
    "\n",
    "参考：使用 nn.DataParallel 替代 multiprocessing "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 最佳实践和提示 \n",
    "### 避免和抵制死锁\n",
    "\n",
    "当一个新进程被产生时，有很多事情可能会出错，最常见的死锁原因是后台线程。\n",
    "如果有任何线程持有锁或导入模块，并且 fork 被调用，则子进程很可能处于损坏的状态，并以不同的方式死锁或失败。\n",
    "\n",
    "注意，即使您没有，Python内置的库也可能会这样做 —— 不需要看得比 multiprocessing 更远。 \n",
    "\n",
    "multiprocessing.Queue 实际上是一个非常复杂的类，它产生用于序列化，发送和接收对象的多个线程，它们也可能引起上述问题。如果您发现自己处于这种情 况，请尝试使用 multiprocessing.queues.SimpleQueue ，这不会使用任何其他线程。\n",
    "\n",
    "我们正在竭尽全力把它设计得更简单，并确保这些死锁不会发生，但有些事情无法控制。如果有任何问题您无法一时无法解决，请尝试在论坛上提出，我们将看看是否可以解决问题"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### 重用经过队列的缓冲区\n",
    "\n",
    "记住每次将 Tensor 放入 multiprocessing.Queue 时，必须将其移动到共享内存中。\n",
    "\n",
    "如果它已经被共享，它是一个无效的操作，否则会产生一个额外的内存副本，这会减缓整个进程。\n",
    "\n",
    "即使你有一个进程池来发送数据到一个进程，使它返回缓冲区 —— 这几乎是免费的，并且允许你在 发送下一个batch时避免产生副本。\n",
    "\n",
    "### 异步多进程训练（例如Hogwild）\n",
    "\n",
    "使用 torch.multiprocessing ，可以异步地训练模型，参数可以一直共享，也可以定期同步。在第一种情况下，我们建议发送整个模型对象，而在后者中，我们建议只发送 state_dict() 。\n",
    "\n",
    "我们建议使用 multiprocessing.Queue 来在进程之间传递各种PyTorch对象。例如， 当使用fork 启动方法时，可能会继承共享内存中的张量和存储器，但这是非常容易出错的，应谨慎使用，而且只能由高级用户使用。\n",
    "\n",
    "队列虽然有时是一个较不优雅的解决方案，但基本上能在所 有情况下正常工作。\n",
    "\n",
    "<div class=\"alert alert-warning\"><li>需要注意的一点</li></div>\n",
    "你应该注意有关全局语句，它们没有被 if __name__ == '__main__' 保护。如果使用 与 fork 不同的启动方法，则它们将在所有子进程中执行。"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import torch.nn as nn\n",
    "import torch.nn.functional as F\n",
    "import torch.multiprocessing as mp\n",
    "\n",
    "class Net(nn.Module):\n",
    "    def __init__(self):\n",
    "        super(Net, self).__init__()\n",
    "        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)\n",
    "        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)\n",
    "        self.conv2_drop = nn.Dropout2d()\n",
    "        self.fc1 = nn.Linear(320, 50)\n",
    "        self.fc2 = nn.Linear(50, 10)\n",
    "\n",
    "    def forward(self, x):\n",
    "        x = F.relu(F.max_pool2d(self.conv1(x), 2))\n",
    "        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))\n",
    "        x = x.view(-1, 320)\n",
    "        x = F.relu(self.fc1(x))\n",
    "        x = F.dropout(x, training=self.training)\n",
    "        x = self.fc2(x)\n",
    "        return F.log_softmax(x, dim=1)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Hogwild\n",
    "# 在examples repository中可以找到具体的Hogwild实现，可以展示代码的整体结构。下面一个小例子(自己去找替换一个model)：\n",
    "\n",
    "import torch.multiprocessing as mp\n",
    "\n",
    "def train(model):\n",
    "    # Construct data_loader, optimizer, etc.\n",
    "    for data, labels in data_loader:\n",
    "        optimizer.zero_grad()\n",
    "        loss_fn(model(data), labels).backward()\n",
    "        optimizer.step()  # This will update the shared parameters\n",
    "\n",
    "if __name__ == '__main__':\n",
    "    num_processes = 4\n",
    "    model = Net()\n",
    "    # NOTE: this is required for the ``fork`` method to work\n",
    "    model.share_memory()\n",
    "    processes = []\n",
    "    for rank in range(num_processes):\n",
    "        p = mp.Process(target=train, args=(model,))\n",
    "        p.start()\n",
    "        processes.append(p)\n",
    "    for p in processes:\n",
    "        p.join()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.2"
  },
  "toc": {
   "base_numbering": 1,
   "nav_menu": {},
   "number_sections": true,
   "sideBar": true,
   "skip_h1_title": false,
   "title_cell": "Table of Contents",
   "title_sidebar": "Contents",
   "toc_cell": false,
   "toc_position": {
    "height": "calc(100% - 180px)",
    "left": "10px",
    "top": "150px",
    "width": "273.188px"
   },
   "toc_section_display": true,
   "toc_window_display": true
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
